package com.atguigu.gmall.realtime.app.dwm;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.bean.OrderWide;
import com.atguigu.gmall.realtime.bean.PaymentInfo;
import com.atguigu.gmall.realtime.bean.PaymentWide;
import com.atguigu.gmall.realtime.utils.DateTimeUtil;
import com.atguigu.gmall.realtime.utils.MyKafka;
import com.atguigu.gmall.realtime.utils.MyKafkaPro;
import com.ctc.wstx.util.DataUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;

import java.time.Duration;

//支付宽表处理的应用，和订单宽表进行关联
public class PaymentWideApp {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        //设置检查点
        //1.2 设计checkpoint
        //设置多少s开启一次检查点,精准一次消费
        env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
        //设置重启策略,重启多少次，和每次连接的时间
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,5000L));
        //设置检查点保存位置
        env.setStateBackend(new FsStateBackend("hdfs://hadoop104:8020/gmall/gmall/flink/checkpoint"));
        //设置操作用户
        System.setProperty("HADOOP_USER_NAME","atguigu");
        //设置超时时间
        env.getCheckpointConfig().setCheckpointTimeout(60000L);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //从kafka中读取数据
        //订单宽表写入的主题
        String orderWideTopic="dwm_order_wide";
        String groupId="PaymentWideApp";
        FlinkKafkaConsumer<String> orderWideSource = MyKafka.getFlinkKafkaConsumer(orderWideTopic, groupId);
        DataStreamSource<String>  orderWideDStr = env.addSource(orderWideSource);
        //支付主题
        String paymentTopic="dwd_payment_info";
        FlinkKafkaConsumer<String>  paymentSource = MyKafka.getFlinkKafkaConsumer(paymentTopic, groupId);
        DataStreamSource<String>  paymentDStr = env.addSource(paymentSource);

        //测试数据是否可以进行消费
       // orderWideDStr.print("订单宽表消费测试>>>");
        //paymentDStr.print("支付主题进行消费测试>>");

        //将数据进行结构转换,jsonStr转换为jSONobj
        SingleOutputStreamOperator<OrderWide> orderWideDStream = orderWideDStr.map(
                r -> JSON.parseObject(r, OrderWide.class)
        );

        SingleOutputStreamOperator<PaymentInfo> paymentDStream = paymentDStr.map(
                r -> JSON.parseObject(r, PaymentInfo.class)
        );

        //水位线设置
        SingleOutputStreamOperator<OrderWide> orderWideWithWaterMarkDStream = orderWideDStream.assignTimestampsAndWatermarks(
                WatermarkStrategy.<OrderWide>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(new SerializableTimestampAssigner<OrderWide>() {
                            @Override
                            public long extractTimestamp(OrderWide orderWide, long recordTimestamp) {
                                return DateTimeUtil.toTs(orderWide.getCreate_time());
                            }
                        })
        );

        // 因为是支付，所在事件时间的回调时间
        SingleOutputStreamOperator<PaymentInfo> paymentWithWaterMarkDStream = paymentDStream.assignTimestampsAndWatermarks(
                WatermarkStrategy.<PaymentInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(new SerializableTimestampAssigner<PaymentInfo>() {
                            @Override
                            public long extractTimestamp(PaymentInfo paymentInfo, long recordTimestamp) {
                                return DateTimeUtil.toTs(paymentInfo.getCallback_time());
                            }
                        })
        );
        //通过orderId进行分组
        KeyedStream<OrderWide, Long> orderWideLongKeyedDStream = orderWideWithWaterMarkDStream.keyBy(OrderWide::getOrder_id);
        KeyedStream<PaymentInfo, Long> paymentWideLongKeyedDStream = paymentWithWaterMarkDStream.keyBy(PaymentInfo::getOrder_id);

        //进行join合成一个大的支付宽表,用支付去join订单，因为支付总是会有的，而有订单不一定有支付
        SingleOutputStreamOperator<PaymentWide> joinDStream = paymentWideLongKeyedDStream
                .intervalJoin(orderWideLongKeyedDStream)
                .between(Time.seconds(-1800), Time.seconds(0))
                .process(
                        new ProcessJoinFunction<PaymentInfo, OrderWide, PaymentWide>() {
                            @Override
                            public void processElement(PaymentInfo paymentInfo, OrderWide orderWide, Context ctx, Collector<PaymentWide> out) throws Exception {
                                out.collect(new PaymentWide(paymentInfo, orderWide));

                            }
                        }
                );

        joinDStream.print("支付和订单join之后测试>>>");

        //将数据从实体类转变为jsonobj
        SingleOutputStreamOperator<String> joinDStremaStr = joinDStream.map(JSON::toJSONString);


        //将关联后的数据写入kafka dwm_payment_wide
        String topic="dwm_payment_wide";
        FlinkKafkaProducer<String> flinkKafkaProducer = MyKafkaPro.getFlinkKafkaProducer(topic);

        joinDStremaStr.addSink(flinkKafkaProducer);


        env.execute();
    }
}
